{
 "cells": [
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 部署异步推理服务\n",
    "\n",
    "在复杂的模型推理场景中，例如AIGC、视频处理等场景中，模型服务推理耗时较长，存在长连接超时导致请求失败或实例负载不均衡等问题，不适用于实时推理的场景。针对以上问题，PAI提供了异步推理服务，用于支持类似的场景，用户可以在提交预测请求之后，通过轮询或是订阅的方式获取到推理服务的预测结果。\n",
    "\n",
    "在当前文档中，我们将介绍如何使用PAI Python SDK在PAI上部署和调用异步推理服务。"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "\n",
    "## 费用说明\n",
    "\n",
    "本示例将会使用以下云产品，并产生相应的费用账单：\n",
    "\n",
    "- PAI-EAS：部署推理服务，详细计费说明请参考[PAI-EAS计费说明](https://help.aliyun.com/zh/pai/product-overview/billing-of-eas)\n",
    "- OSS：存储推理服务代码等，详细计费说明请参考[OSS计费概述](https://help.aliyun.com/zh/oss/product-overview/billing-overview)\n",
    "\n",
    "> 通过参与云产品免费试用，使用**指定资源机型**，可以免费试用PAI产品，具体请参考[PAI免费试用](https://help.aliyun.com/zh/pai/product-overview/free-quota-for-new-users)。\n",
    "\n"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 准备工作\n",
    "\n",
    "我们可以通过以下命令安装PAI Python SDK。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "\n",
    "!python -m pip install --upgrade pai"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "\n",
    "SDK需要配置访问阿里云服务需要的 AccessKey，以及当前使用的工作空间和OSS Bucket。在PAI Python SDK安装之后，通过在 **命令行终端** 中执行以下命令，按照引导配置密钥，工作空间等信息。\n",
    "\n",
    "\n",
    "```shell\n",
    "\n",
    "# 以下命令，请在 命令行终端 中执行.\n",
    "\n",
    "python -m pai.toolkit.config\n",
    "\n",
    "```\n",
    "\n",
    "我们可以通过以下代码验证当前的配置。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pai\n",
    "from pai.session import get_default_session\n",
    "\n",
    "print(pai.__version__)\n",
    "sess = get_default_session()\n",
    "\n",
    "assert sess.workspace_name is not None\n",
    "print(sess.workspace_name)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 部署异步推理服务模型\n",
    "\n",
    "将模型部署为异步推理服务与部署标准的在线推理服务类似，用户仅需在部署时（`Model.deploy`)，传递`service_type=ServicType.Async`即可。\n",
    "\n",
    "当前流程中，我们将使用镜像部署的模式，部署一个异步的推理服务。\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "\n",
    "# 准备异步推理服务的应用代码目录\n",
    "!mkdir -p serve_src/"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "通过`%%writefile`指令，我们将推理服务代码写入到`serve_src/run.py`文件中。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%writefile serve_src/run.py\n",
    "import asyncio\n",
    "from random import random\n",
    "\n",
    "from fastapi import FastAPI, Request\n",
    "import uvicorn, json, datetime\n",
    "\n",
    "# 默认模型加载路径\n",
    "model_path = \"/eas/workspace/model/\"\n",
    "\n",
    "app = FastAPI()\n",
    "\n",
    "\n",
    "@app.post(\"/\")\n",
    "async def create_item(request: Request):\n",
    "    print(\"Make mock prediction starting ...\")\n",
    "    # Mock prediction\n",
    "    await asyncio.sleep(15)\n",
    "    print(\"Prediction finished.\")\n",
    "    return [random() for _ in range(10)]\n",
    "\n",
    "\n",
    "if __name__ == \"__main__\":\n",
    "    uvicorn.run(app, host=\"0.0.0.0\", port=8000, workers=1)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "我们将使用PAI提供的PyTorch推理镜像部署以上的模型。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pai.model import Model, container_serving_spec\n",
    "from pai.predictor import ServiceType\n",
    "from pai.image import retrieve, ImageScope\n",
    "\n",
    "m = Model(\n",
    "    inference_spec=container_serving_spec(\n",
    "        source_dir=\"serve_src\",\n",
    "        command=\"python run.py\",\n",
    "        image_uri=retrieve(\n",
    "            \"PyTorch\",\n",
    "            framework_version=\"1.10\",\n",
    "            accelerator_type=\"gpu\",\n",
    "            image_scope=ImageScope.INFERENCE,\n",
    "        ),\n",
    "        requirements=[\n",
    "            \"fastapi\",\n",
    "            \"uvicorn\",\n",
    "        ],\n",
    "    )\n",
    "    # 用户可以通过`model_data`参数，传递一个OSS上的模型。相应的模型会被加载到推理服务的容器中。\n",
    "    # model_data=\"oss://<YourOssBucket>/path/to/model/\"\n",
    ")"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "通过设置部署服务的`service_type=ServiceType.Async`参数，我们可以将模型部署为异步推理服务。异步推理服务使用分别使用输入队列（source)和输出队列（sink）保存预测请求和预测结果。通过`options`参数，可以配置队列使用的资源，队列最大长度，是否开启自动驱逐等高阶参数。异步服务支持的完整的高阶参数，请参考文档：[异步服务-参数配置](https://help.aliyun.com/document_detail/476812.html?#section-gor-gne-gtq)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pai.predictor import AsyncPredictor\n",
    "from pai.common.utils import random_str\n",
    "\n",
    "\n",
    "service_name = f\"async_service_example_{random_str(6)}\"\n",
    "\n",
    "p: AsyncPredictor = m.deploy(\n",
    "    service_name=service_name,\n",
    "    instance_type=\"ecs.c6.large\",\n",
    "    # 设置当前部署的服务类型为异步服务\n",
    "    service_type=ServiceType.Async,\n",
    "    # 用户可以通过options字段配置高阶参数\n",
    "    options={\n",
    "        # 异步推理详细参数文档： https://help.aliyun.com/document_detail/476812.html\n",
    "        \"queue.cpu\": 2,  # 队列使用的CPU核数，默认为1\n",
    "        \"queue.memory\": 2048,  # 异步服务使用过的队列内存，单位为MB\n",
    "    },\n",
    ")\n",
    "\n",
    "print()\n",
    "\n",
    "print(p)\n",
    "print(p.service_name)\n",
    "print(p.access_token)"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 调用推理服务\n",
    "\n",
    "用户发送调用异步队列服务与请求同步推理服务的方式相同，但是异步推理服务会立即返回本次预测请求的`RequestId`，而不是预测结果。用户可以通过轮询获取到推理服务的预测结果。\n",
    "\n",
    "- **用户客户端**发送推理请求，加入到推理服务的输入队列中，PAI-EAS返回请求的RequestId。\n",
    "- PAI处理输入队列中的请求，转发给到**用户的推理服务**，推理服务处理完请求后，将结果写入到输出队列中\n",
    "- **用户客户端**可以通过RequestId轮询，可以获取到**用户推理服务**的预测结果\n",
    "\n",
    "\n",
    "PAI Python SDK提供了`AsyncPredictor`，支持用户更加简单得调用异步推理服务。"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 调用异步推理服务\n",
    "\n",
    "`AsyncPredictor`提供了`predict`和`raw_predict`方法发送预测请求，它们都会返回一个`AsyncTask`，用户可以通过`AsyncTask.result()`获取预测结果。 \n",
    "\n",
    "二者的区别在于`predict`方法会使用`Serializer`对象对输入数据进行序列化，对预测结果进行反序列化，而`raw_predict`方法直接将输入数据发送给异步推理服务，返回HTTP响应结果（`RawResponse`)。\n",
    "\n",
    "```python\n",
    "\n",
    "from pai.predictor import AsyncPredictor, AsyncTask\n",
    "from pai.serializer import JsonSerializer\n",
    "\n",
    "p = AsyncPredictor(service_name='test_async_service', serializer=JsonSerializer())\n",
    "\n",
    "t1: AsyncTask = p.predict(data={\"some\": \"data\"})\n",
    "# result是推理服务的响应结果(Response Body)，经过Serialzier.deserialize处理后返回的结果.\n",
    "result = t1.result()\n",
    "\n",
    "\n",
    "t2: AsyncTask = p.raw_predict(data=b'{\"some\": \"data\"}')\n",
    "resp: RawResponse = t2.result()\n",
    "print(resp.status_code, resp.content)\n",
    "\n",
    "```\n",
    "\n",
    "`AsyncPredictor`会维护一个线程池，通过一个线程去发送推理请求，并等待请求处理完成。用户可以通过`max_workers`参数配置线程池的大小。\n",
    "\n",
    "```python\n",
    "\n",
    "p = AsyncPredictor(service_name='test_async_service', max_workers=20)\n",
    "\n",
    "```\n",
    "\n",
    "当用户需要在异步请求完成之后，对于响应的结果进行处理时，可以通过`callback`参数传递一个回调函数。回调函数的参数为`AsyncTask.result()`，也就实际响应的结果。\n",
    "\n",
    "\n"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "以下的示例代码中，我们将使用`AsyncPredictor`调用异步推理服务，并通过会回调函数处理预测结果。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pai.predictor import RawResponse, AsyncTask\n",
    "import time\n",
    "\n",
    "# 结果列表\n",
    "results = []\n",
    "\n",
    "\n",
    "# 定义回调函数\n",
    "def callback_fn(resp: RawResponse):\n",
    "    print(\"Callback: get prediction result \", resp.json())\n",
    "    results.append(resp.json())\n",
    "\n",
    "\n",
    "# 发送预测请求，使用回调函数处理预测结果。\n",
    "task: AsyncTask = p.raw_predict(\n",
    "    data=b\"test_data\",\n",
    "    callback=callback_fn,\n",
    ")\n",
    "\n",
    "# result() 方法等待预测完成\n",
    "resp: RawResponse = task.result()\n",
    "print(resp.json())\n",
    "\n",
    "# 等待回调函数执行完成\n",
    "time.sleep(1)\n",
    "\n",
    "print(results)\n",
    "assert len(results) == 1"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "以下示例中，我们批量发送异步推理请求，然后等待所有的请求完成。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "tasks = []\n",
    "\n",
    "for i in range(10):\n",
    "    task: AsyncTask = p.raw_predict(\n",
    "        data=b\"test_data\",\n",
    "        callback=lambda x: print(\"Prediction result: \", x.json()),\n",
    "    )\n",
    "    tasks.append(task)\n",
    "\n",
    "prediction_results = [t.result().json() for t in tasks]\n",
    "\n",
    "print(prediction_results)\n",
    "print(len(prediction_results))"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 使用异步API调用推理服务\n",
    "\n",
    "`AsyncPredictor` 提供了异步API `raw_predict_async` 和 `predict_async`，支持用户使用Python提供的异步框架(asyncio)调用推理服务。\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pai.predictor import RawResponse\n",
    "\n",
    "# 使用异步API调用异步推理服务\n",
    "res: RawResponse = await p.raw_predict_async(data=b\"test_data\")\n",
    "\n",
    "print(res.status_code)\n",
    "print(res.content)\n",
    "print(res.json())"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "通过SDK提供的异步API，我们可以不借助于线程池，批量发送异步预测请求。\n",
    "\n",
    "以下的示例中，我们将使用异步API，批量发送异步预测请求，等待推理完成，并使用回调函数打印预测请求结果。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import asyncio\n",
    "\n",
    "\n",
    "# 定义回调函数\n",
    "def task_done_cb(task: asyncio.Task):\n",
    "    if task.exception():\n",
    "        raise task.exception()\n",
    "    else:\n",
    "        print(\"Prediction result: \", task.result().json())\n",
    "\n",
    "\n",
    "# 使用异步API批量调用异步推理服务\n",
    "async def batch_predict():\n",
    "    tasks = []\n",
    "    for _ in range(10):\n",
    "        task = asyncio.create_task(\n",
    "            # raw_predict_async 是一个coroutine\n",
    "            p.raw_predict_async(\n",
    "                data=b\"test_data\",\n",
    "            )\n",
    "        )\n",
    "        # 调用完成之后，打印调用返回结果\n",
    "        task.add_done_callback(task_done_cb)\n",
    "\n",
    "        tasks.append(task)\n",
    "    # 等待所有任务完成\n",
    "    return await asyncio.gather(*tasks, return_exceptions=True)\n",
    "\n",
    "\n",
    "batch_results = await batch_predict()\n",
    "\n",
    "\n",
    "for result in batch_results:\n",
    "    print(result.json())"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "测试完成之后，可以使用`delete_service`方法删除对应服务，释放资源。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "p.delete_service()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "pai-dev-py38",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.19"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
